import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class UDF2
{


    public static void main(String[] args) throws Exception
    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Row> source = env.addSource(new RichSourceFunction<Row>()
        {

            @Override
            public void run(SourceContext<Row> ctx) throws Exception
            {
                Row row = new Row(3);
                row.setField(0, 2);
                row.setField(1, 3);
                row.setField(2, 3);
                ctx.collect(row);
            }

            @Override
            public void cancel()
            {

            }
        }).returns(Types.ROW(Types.INT,Types.INT,Types.INT));


//        導入數據source
//        列名分別是a,b,c
//        表格名:t
        tEnv.createTemporaryView("t",source,"a,b,c");

        //        tEnv.sqlUpdate("CREATE FUNCTION IF NOT EXISTS test AS 'udf.TestScalarFunc'");

        tEnv.registerFunction("test",new TestScalarFunc());

        Table table = tEnv.sqlQuery("select test() as a,test(a) as b, test(a,b,c) as c from t");
        tEnv.toAppendStream(table, Row.class).print();


    }
}
